Assignment 03

Author
Affiliation

Thomas Primero

Boston University

Published

September 23, 2025

Modified

September 24, 2025

1 Load the Dataset

Load the Raw Dataset: Use Pyspark to the lightcast_data.csv file into a DataFrame: You can reuse the previous code. Copying code from your friend constitutes plagiarism. DO NOT DO THIS.

import pandas as pd
import plotly.express as px
import plotly.io as pio
from pyspark.sql import SparkSession
import re
import numpy as np
import plotly.graph_objects as go
from pyspark.sql.functions import col, split, explode, regexp_replace, transform, when
from pyspark.sql import functions as F
from pyspark.sql.functions import col, monotonically_increasing_id


np.random.seed(42)

pio.renderers.default = "notebook"

# Initialize Spark Session
spark = SparkSession.builder.appName("LightcastData").getOrCreate()

# Load Data
df = spark.read.option("header", "true").option("inferSchema", "true").option("multiLine","true").option("escape", "\"").csv("/home/ubuntu/assignment-03-t-primero/data/lightcast_job_postings.csv")
df.createOrReplaceTempView("job_postings")

# Show Schema and Sample Data
# print("---This is Diagnostic check, No need to print it in the final doc---")

# df.printSchema() # comment this line when rendering the submission
# df.show(5)
WARNING: Using incubator modules: jdk.incubator.vector
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/24 03:08:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/09/24 03:08:18 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
[Stage 0:>                                                          (0 + 1) / 1]                                                                                [Stage 1:>                                                          (0 + 1) / 1]                                                                                25/09/24 03:08:31 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

2 Data Preparation

We will be converting numerical columns to floats - this is so we can perform functions on it such as average.

#  Step 1: Casting salary and experience columns
df = df.withColumn("SALARY", col("SALARY").cast("float")) \
     .withColumn("SALARY_FROM", col("SALARY_FROM").cast("float")) \
     .withColumn("SALARY_TO", col("SALARY_TO").cast("float")) \
     .withColumn("MIN_YEARS_EXPERIENCE", col("MIN_YEARS_EXPERIENCE").cast("float")) \
     .withColumn("MAX_YEARS_EXPERIENCE", col("MAX_YEARS_EXPERIENCE").cast("float"))

# Step 2: Computing medians for salary columns
def compute_median(sdf, col_name):
    q = sdf.approxQuantile(col_name, [0.5], 0.01)
    return q[0] if q else None

median_from = compute_median(df, "SALARY_FROM")
median_to = compute_median(df, "SALARY_TO")
median_salary = compute_median(df, "SALARY")

print("Medians:", median_from, median_to, median_salary)

# Step 3: Imputing missing salaries, but not experience
df = df.fillna({
    "SALARY_FROM": median_from,
    "SALARY_TO": median_to
})

# Step 5: Computing average salary
df = df.withColumn("Average_Salary", (col("SALARY_FROM") + col("SALARY_TO")) / 2)

# Step 6: Selecting required columns
export_cols = [
    "EDUCATION_LEVELS_NAME",
    "REMOTE_TYPE_NAME",
    "MAX_YEARS_EXPERIENCE",
    "Average_Salary",
    "LOT_V6_SPECIALIZED_OCCUPATION_NAME"
]
df_selected = df.select(*export_cols)

# Step 7: Saving to CSV
pdf = df_selected.toPandas()
pdf.to_csv("./data/lightcast_cleaned.csv", index=False)      

print("Data cleaning complete. Rows retained:", len(pdf))
[Stage 2:>                                                          (0 + 1) / 1]                                                                                [Stage 3:>                                                          (0 + 1) / 1]                                                                                [Stage 4:>                                                          (0 + 1) / 1]                                                                                
Medians: 87295.0 130042.0 115024.0
[Stage 5:>                                                          (0 + 1) / 1]                                                                                
Data cleaning complete. Rows retained: 72498
# Histogram of SALARY distribution
# salary_df = df.filter(col("SALARY").isNotNull() & (col("SALARY") > 0))
#fig = px.histogram(salary_df.toPandas(), x="SALARY", nbins=50, title="Salary Distribution")
#fig.update_layout(bargap=0.1)

3 Salary Distribution by Industry and Employment Type

Compare salary variations across industries. Filter the dataset Remove records where salary is missing or zero. Aggregate Data Group by NAICS industry codes. Group by employment type and compute salary distribution. Visualize results Create a box plot where: X-axis = ‘NAICS2_NAME’ Y-axis = ‘SALARY_FROM’, or ‘SALARY_TO’, or ‘SALARY’ Group by ‘EMPLOYMENT_TYPE_NAME’. Customize colors, fonts, and styles. Explanation: Write two sentences about what the graph reveals.

# Your Code for 1st question here
import pandas as pd
import polars as pl

# Filter out missing or zero salary values
pdf = df.filter(df["SALARY"] > 0).select("EMPLOYMENT_TYPE_NAME", "SALARY").toPandas()
# pdf.head()

# Clean employment type names for better readability
# This Basically looks for symbols numbers (which were incorrectly added into data name)
pdf["EMPLOYMENT_TYPE_NAME"] = pdf["EMPLOYMENT_TYPE_NAME"].apply(lambda x: re.sub(r"[^\x00-\x7f]+", "", x))
# pdf.head()

# Compute median salary for sorting
median_salaries = pdf.groupby("EMPLOYMENT_TYPE_NAME")["SALARY"].median()
# median_salaries.head()

# Sort employment types based on median salary in descending order
sorted_employment_types = median_salaries.sort_values(ascending=False).index

# Apply sorted categories
pdf["EMPLOYMENT_TYPE_NAME"] = pd.Categorical(
    pdf["EMPLOYMENT_TYPE_NAME"],
    categories=sorted_employment_types,
    ordered=True
)

# Create box plot with horizontal grid lines
fig = px.box(
    pdf,
    x="EMPLOYMENT_TYPE_NAME",
    y="SALARY",
    title="Salary Distribution by Employment Type",
    color_discrete_sequence=["orange"],  # Single neutral color
    boxmode="group",
    points="all",  # Show all outliers
)
# Improve layout, font styles, and axis labels
fig.update_layout(
    title=dict(
        text="Salary Distribution by Employment Type",
        font=dict(size=26, family="Verdana", color="black", weight="bold")  # Bigger & Bold Title
    ),
    xaxis=dict(
        title=dict(text="Employment Type", font=dict(size=22, family="Verdana", color="black", weight="bold")),  # Bigger X-label
        tickangle=0,  # Rotate X-axis labels for readability
        tickfont=dict(size=18, family="Verdana", color="black", weight="bold"),  # Bigger & Bold X-ticks
        showline=True,  # Show axis lines
        linewidth=2,  # Thicker axis lines
        linecolor="black",
        mirror=True,
        showgrid=False,  # Remove vertical grid lines
        categoryorder="array",
        categoryarray=sorted_employment_types.tolist()
    ),
    yaxis=dict(
        title=dict(text="Salary (Thousands)", font=dict(size=22, family="Verdana", color="black", weight="bold")),  # Bigger Y-label
        tickvals=[0, 50000, 100000, 150000, 200000, 250000, 300000, 350000, 400000, 450000, 500000],
        ticktext=["0", "50", "100", "150", "200", "250", "300", "350", "400", "450", "500"],
        tickfont=dict(size=18, family="Verdana", color="black", weight="bold"),  # Bigger & Bold Y-ticks
        showline=True,
        linewidth=2,
        linecolor="black",
        mirror=True,
        showgrid=True,  # Enable light horizontal grid lines
        gridcolor="lightgray",  # Light shade for the horizontal grid
        gridwidth=0.5  # Thin grid lines
    ),
    font=dict(family="Verdana", size=16, color="black"),
    boxgap=0.5,
    plot_bgcolor="white",
    paper_bgcolor="white",
    showlegend=False,
    height=800,
    width=900
)

# Show the figure
fig.show()
[Stage 6:>                                                          (0 + 1) / 1]                                                                                

4 Salary Distribution by Industry

pdf = df.select("NAICS2_NAME", "SALARY").toPandas()
fig = px.box(pdf, x="NAICS2_NAME", y="SALARY", title="Salary Distribution by Industry", color_discrete_sequence=["#EF553B"])
fig.update_layout(font_family="Arial", title_font_size=16,
                  height=1000,
                  width=1200)
# Rotate x-axis labels for readability
fig.update_xaxes(tickangle=45, tickfont=dict(size=12))
fig.show()
[Stage 7:>                                                          (0 + 1) / 1]                                                                                

5 Salary Analysis by ONET Occupation Type (Bubble Chart)

Analyze how salaries differ across ONET occupation types. Aggregate Data Compute median salary for each occupation in the ONET taxonomy. Visualize results Create a bubble chart where: X-axis = ONET_NAME Y-axis = Median Salary Size = Number of job postings Apply custom colors and font styles. Explanation: Write two sentences about what the graph reveals.

[Stage 8:>                                                          (0 + 1) / 1]                                                                                
# Defining education level groupings
lower_deg = ["Bachelor's", "Associate", "GED", "No Education Listed", "High school"]
higher_deg = ["Master's degree", "PhD or professional degree"]

# Adding EDU_GROUP column
df = df.withColumn(
    "EDU_GROUP",
    when(col("EDUCATION_LEVELS_NAME").rlike("|".join([f"(?i){deg}" for deg in lower_deg])), "Bachelor's or lower")
    .when(col("EDUCATION_LEVELS_NAME").rlike("|".join([f"(?i){deg}" for deg in higher_deg])), "Master's or PhD")
    .otherwise("Other")
)

# Casting necessary columns to float
df = df.withColumn("MAX_YEARS_EXPERIENCE", col("MAX_YEARS_EXPERIENCE").cast("float"))
df = df.withColumn("Average_Salary", col("Average_Salary").cast("float"))

# Filtering for non-null and positive values
df = df.filter(
    col("MAX_YEARS_EXPERIENCE").isNotNull() &
    col("Average_Salary").isNotNull() &
    (col("MAX_YEARS_EXPERIENCE") > 0) &
    (col("Average_Salary") > 0)
)

# Filtering for just the two education groups
df_filtered = df.filter(col("EDU_GROUP").isin("Bachelor's or lower", "Master's or PhD"))

# Converting to Pandas for plotting
df_pd = df_filtered.toPandas()
[Stage 11:>                                                         (0 + 1) / 1]                                                                                
fig1 = px.scatter(
    df_pd,
    x="MAX_YEARS_EXPERIENCE",
    y="Average_Salary",
    color="EDU_GROUP",
    hover_data=["LOT_V6_SPECIALIZED_OCCUPATION_NAME"],
    title="<b>Experience vs Salary by Education Level</b>",
    opacity=0.7,
    color_discrete_sequence=["#636EFA", "#EF553B"]  # Blue, Red
)

fig1.update_traces(marker=dict(size=7, line=dict(width=1, color="black")))

fig1.update_layout(
    plot_bgcolor="#f0f0f0",  # light grey chart background
    paper_bgcolor="#FFF5DC",  # soft blue frame
    font=dict(family="Segoe UI", size=14),
    title_font=dict(size=22),
    xaxis_title="Years of Experience",
    yaxis_title="Average Salary (USD)",
    legend_title="Education Group",
    hoverlabel=dict(bgcolor="white", font_size=13, font_family="Arial"),
    margin=dict(t=70, b=60, l=60, r=60),
    xaxis=dict(
        gridcolor="lightgrey",
        tickmode='linear',
        dtick= 1 #show every integer year clearly
    ),
    yaxis=dict(gridcolor="lightgrey")
)

fig1.show()
fig1.write_html("output\q_1a_Experience_vs_Salary_by_Education_Level")
<>:33: SyntaxWarning:

invalid escape sequence '\q'

<>:33: SyntaxWarning:

invalid escape sequence '\q'

/tmp/ipykernel_9528/50348896.py:33: SyntaxWarning:

invalid escape sequence '\q'

6 Salary by Remote Work Type

Split into three groups based on ‘REMOTE_TYPE_NAME’: Remote Hybrid Onsite (includes [None] and blank) Plot scatter plots for each group using, ‘MAX_YEARS_EXPERIENCE’ (with jitter), ‘Average_Salary’, ‘LOT_V6_SPECIALIZED_OCCUPATION_NAME’ Also, create salary histograms for all three groups. After each graph, briefly describe any patterns or comparisons.

# Step 1: Create the Average_Salary column using SQL
remote_salary_data = spark.sql("""
SELECT 
    MAX_YEARS_EXPERIENCE,
    (SALARY_FROM + SALARY_TO) / 2 AS Average_Salary,
    LOT_V6_SPECIALIZED_OCCUPATION_NAME,
    CASE 
        WHEN REMOTE_TYPE_NAME = 'Remote' THEN 'Remote'
        WHEN REMOTE_TYPE_NAME = 'Hybrid' THEN 'Hybrid'
        ELSE 'Onsite'
    END AS REMOTE_GROUP
FROM job_postings
WHERE SALARY_FROM IS NOT NULL 
  AND SALARY_TO IS NOT NULL
  AND SALARY_FROM > 0
  AND SALARY_TO > 0
  AND MAX_YEARS_EXPERIENCE IS NOT NULL
""")

# Step 2: Convert to pandas for plotting
df_viz = remote_salary_data.toPandas()

# Step 3: Create scatter plot (rest of code same as before)
import plotly.express as px
import numpy as np

np.random.seed(42)
df_viz['EXPERIENCE_JITTER'] = df_viz['MAX_YEARS_EXPERIENCE'] + np.random.uniform(-0.3, 0.3, len(df_viz))

fig_scatter = px.scatter(
    df_viz,
    x='EXPERIENCE_JITTER',
    y='Average_Salary',
    color='REMOTE_GROUP',
    hover_data=['LOT_V6_SPECIALIZED_OCCUPATION_NAME'],
    title='Salary by Experience and Remote Work Type'
)

fig_scatter.show()
[Stage 12:>                                                         (0 + 1) / 1]